package drds.data_propagate.binlog_event;

import drds.data_propagate.binlog_event.event.*;
import drds.data_propagate.binlog_event.event.mariadb.*;
import drds.data_propagate.binlog_event.event.rows_event.DeleteRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.RowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.UpdateRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.WriteRowsEvent;
import drds.data_propagate.driver.packets.GtidSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.util.BitSet;


public final class BinLogEventDecoder {

    protected static final Log logger = LogFactory.getLog(BinLogEventDecoder.class);

    protected final BitSet handleSet = new BitSet(BinLogEvent.enum_end_event);

    public BinLogEventDecoder() {
    }

    public BinLogEventDecoder(final int fromIndex, final int toIndex) {
        handleSet.set(fromIndex, toIndex);
    }

    public BinLogEvent decode(Buffer buffer, Header header, BinLogEventsContext binLogEventsContext) throws IOException {
        FormatDescriptionEvent formatDescriptionEvent = binLogEventsContext.getFormatDescriptionEvent();
        BinlogEventPosition binlogEventPosition = binLogEventsContext.getBinlogEventPosition();

        int checkSumAlg = BinLogEvent.binlog_check_sum_alg_undef;
        if (header.getEventType() != BinLogEvent.format_description_event) {
            checkSumAlg = formatDescriptionEvent.header.getChecksumAlg();
        } else {
            // 如果是format事件自己，也需要处理checksum
            checkSumAlg = header.getChecksumAlg();
        }

        if (checkSumAlg != BinLogEvent.binlog_checksum_alg_off
                && checkSumAlg != BinLogEvent.binlog_check_sum_alg_undef) {
            // remove checksum bytes
            buffer.newLimit(header.getEventLength() - BinLogEvent.binlog_checksum_len);
        }

        GtidEvent gtidEvent = binLogEventsContext.getGtidEvent();
        // GtidSet gtidSet = binLogEventsContext.getGtidSet();
        switch (header.getEventType()) {
            case BinLogEvent.query_event: {
                QueryEvent queryEvent = new QueryEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return queryEvent;
            }
            case BinLogEvent.xid_event: {
                XidEvent xidEvent = new XidEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return xidEvent;
            }
            case BinLogEvent.table_map_event: {
                TableMapEvent tableMapEvent = new TableMapEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                binLogEventsContext.getTableIdToTableMapEventMap().put(tableMapEvent.getTableId(), tableMapEvent);
                return tableMapEvent;
            }
            case BinLogEvent.write_rows_event_v1: {
                RowsEvent rowsEvent = new WriteRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.update_rows_event_v1: {
                RowsEvent rowsEvent = new UpdateRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.delete_rows_event_v1: {
                RowsEvent rowsEvent = new DeleteRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.rotate_event: {
                RotateEvent rotateEvent = new RotateEvent(header, buffer, formatDescriptionEvent);
                //在这里进行设置
                /* updating readedIndex in context */
                binlogEventPosition = new BinlogEventPosition(rotateEvent.getFilename(), rotateEvent.getPosition());
                binLogEventsContext.setBinlogEventPosition(binlogEventPosition);
                return rotateEvent;
            }
            case BinLogEvent.load_event:
            case BinLogEvent.new_load_event: {
                LoadEvent loadEvent = new LoadEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return loadEvent;
            }
            case BinLogEvent.slave_event: /* can never happen (unused event) */ {
                if (logger.isWarnEnabled())
                    logger.warn("Skipping unsupported SLAVE_EVENT from: " + binLogEventsContext.getBinlogEventPosition());
                break;
            }
            case BinLogEvent.create_file_event: {
                CreateFileEvent createFileEvent = new CreateFileEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return createFileEvent;
            }
            case BinLogEvent.append_block_event: {
                AppendBlockEvent appendBlockEvent = new AppendBlockEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return appendBlockEvent;
            }
            case BinLogEvent.delete_file_event: {
                DeleteFileEvent deleteFileEvent = new DeleteFileEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return deleteFileEvent;
            }
            case BinLogEvent.exec_load_event: {
                ExecuteLoadEvent executeLoadEvent = new ExecuteLoadEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return executeLoadEvent;
            }
            case BinLogEvent.start_event_v3: {
                /* This is sent only by MySQL <=4.x */
                StartEventV3 startEventV3 = new StartEventV3(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return startEventV3;
            }
            case BinLogEvent.stop_event: {
                StopEvent stopEvent = new StopEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return stopEvent;
            }
            case BinLogEvent.intvar_event: {
                IntvarEvent intvarEvent = new IntvarEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return intvarEvent;
            }
            case BinLogEvent.rand_event: {
                RandEvent randEvent = new RandEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return randEvent;
            }
            case BinLogEvent.user_var_event: {
                UserVarEvent userVarEvent = new UserVarEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return userVarEvent;
            }
            case BinLogEvent.format_description_event: {
                formatDescriptionEvent = new FormatDescriptionEvent(header, buffer, formatDescriptionEvent);
                binLogEventsContext.setFormatDescriptionEvent(formatDescriptionEvent);
                return formatDescriptionEvent;
            }
            case BinLogEvent.pre_ga_write_rows_event: {
                if (logger.isWarnEnabled())
                    logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: " + binLogEventsContext.getBinlogEventPosition());
                // ev = new Write_rows_log_event_old(buf, event_len,
                // description_event);
                break;
            }
            case BinLogEvent.pre_ga_update_rows_event: {
                if (logger.isWarnEnabled())
                    logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: " + binLogEventsContext.getBinlogEventPosition());
                // ev = new Update_rows_log_event_old(buf, event_len,
                // description_event);
                break;
            }
            case BinLogEvent.pre_ga_delete_rows_event: {
                if (logger.isWarnEnabled())
                    logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: " + binLogEventsContext.getBinlogEventPosition());
                // ev = new Delete_rows_log_event_old(buf, event_len,
                // description_event);
                break;
            }
            case BinLogEvent.begin_load_query_event: {
                BeginLoadQueryEvent beginLoadQueryEvent = new BeginLoadQueryEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return beginLoadQueryEvent;
            }
            case BinLogEvent.execute_load_query_event: {
                ExecuteLoadQueryEvent executeLoadQueryEvent = new ExecuteLoadQueryEvent(header, buffer,
                        formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return executeLoadQueryEvent;
            }
            case BinLogEvent.incident_event: {
                IncidentEvent incidentEvent = new IncidentEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return incidentEvent;
            }
            case BinLogEvent.heartbeat_log_event: {
                HeartbeatEvent heartbeatEvent = new HeartbeatEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return heartbeatEvent;
            }
            case BinLogEvent.ignorable_log_event: {
                IgnorableEvent ignorableEvent = new IgnorableEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return ignorableEvent;
            }
            case BinLogEvent.rows_query_log_event: {
                RowsQueryEvent rowsQueryEvent = new RowsQueryEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsQueryEvent;
            }
            case BinLogEvent.write_rows_event: {
                RowsEvent writeRowsEvent = new WriteRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                writeRowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return writeRowsEvent;
            }
            case BinLogEvent.update_rows_event: {
                RowsEvent rowsEvent = new UpdateRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.delete_rows_event: {
                RowsEvent rowsEvent = new DeleteRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.partial_update_rows_event: {
                RowsEvent rowsEvent = new UpdateRowsEvent(header, buffer, formatDescriptionEvent, true);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                rowsEvent.setTableMapInfo(binLogEventsContext);
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return rowsEvent;
            }
            case BinLogEvent.gtid_log_event:
            case BinLogEvent.anonymous_gtid_log_event: {
                GtidEvent newGtidEvent = new GtidEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                GtidSet gtidSet = binLogEventsContext.getGtidSet();
                if (gtidSet != null) {
                    gtidSet.update(newGtidEvent.getGtidString());
                    // update latest gtid
                    header.putGtid(gtidSet, newGtidEvent);
                }
                // update current gtid event decode context
                binLogEventsContext.setGtidEvent(newGtidEvent);
                return newGtidEvent;
            }
            case BinLogEvent.previous_gtids_log_event: {
                PreviousGtidsEvent previousGtidsEvent = new PreviousGtidsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return previousGtidsEvent;
            }
            case BinLogEvent.transaction_context_event: {
                TransactionContextEvent transactionContextEvent = new TransactionContextEvent(header, buffer,
                        formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return transactionContextEvent;
            }
            case BinLogEvent.view_change_event: {
                ViewChangeEvent viewChangeEvent = new ViewChangeEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return viewChangeEvent;
            }
            case BinLogEvent.xa_prepare_log_event: {
                XaPrepareEvent xaPrepareEvent = new XaPrepareEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return xaPrepareEvent;
            }
            case BinLogEvent.annotate_rows_event: {
                AnnotateRowsEvent annotateRowsEvent = new AnnotateRowsEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                header.putGtid(binLogEventsContext.getGtidSet(), gtidEvent);
                return annotateRowsEvent;
            }
            case BinLogEvent.binlog_checkpoint_event: {
                BinlogCheckPointEvent binlogCheckPointEvent = new BinlogCheckPointEvent(header, buffer,
                        formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return binlogCheckPointEvent;
            }
            case BinLogEvent.gtid_event: {
                MariaGtidEvent mariaGtidEvent = new MariaGtidEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return mariaGtidEvent;
            }
            case BinLogEvent.gtid_list_event: {
                MariaGtidListEvent mariaGtidListEvent = new MariaGtidListEvent(header, buffer, formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return mariaGtidListEvent;
            }
            case BinLogEvent.start_encryption_event: {
                StartEncryptionEvent startEncryptionEvent = new StartEncryptionEvent(header, buffer,
                        formatDescriptionEvent);
                /* updating readedIndex in context */
                binlogEventPosition.position = header.getNextBinLogEventPosition();
                return startEncryptionEvent;
            }
            default:
                /*
                 * Create an object of Ignorable_log_event for unrecognized sub-class. So that
                 * SLAVE SQL THREAD will only update the readedIndex and continue.
                 */
                if ((buffer.getLittleEndian16UnsignedInt(BinLogEvent.flags_offset)
                        & BinLogEvent.log_event_ignorable_f) > 0) {
                    IgnorableEvent ignorableEvent = new IgnorableEvent(header, buffer, formatDescriptionEvent);
                    /* updating readedIndex in context */
                    binlogEventPosition.position = header.getNextBinLogEventPosition();
                    return ignorableEvent;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skipping unrecognized binlog_event event " + BinLogEvent.getTypeName(header.getEventType())
                                + " from: " + binLogEventsContext.getBinlogEventPosition());
                    }
                }
        }
        /* updating readedIndex in context */
        binlogEventPosition.position = header.getNextBinLogEventPosition();
        /* Unknown or unsupported log event */
        return new UnknownEvent(header);
    }

    //
    public BinLogEvent decode(Buffer buffer, BinLogEventsContext binLogEventsContext) throws IOException {
        final int limit = buffer.limit();

        if (limit >= FormatDescriptionEvent.log_event_header_length) {
            Header header = new Header(buffer, binLogEventsContext.getFormatDescriptionEvent());

            final int eventLength = header.getEventLength();
            if (limit >= eventLength) {
                BinLogEvent binLogEvent;
                if (handleSet.get(header.getEventType())) {
                    buffer.newLimit(eventLength);
                    try {
                        binLogEvent = decode(buffer, header, binLogEventsContext);
                    } catch (IOException e) {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Decoding " + BinLogEvent.getTypeName(header.getEventType()) + " failed from: "
                                    + binLogEventsContext.getBinlogEventPosition(), e);
                        }
                        throw e;
                    } finally {
                        buffer.newLimit(limit); /* Restore newLimit */
                    }
                } else {
                    /* Ignore unsupported binary-log. */
                    binLogEvent = new UnknownEvent(header);
                }

                if (binLogEvent != null) {
                    // set logFileName
                    binLogEvent.getHeader().setLogFileName(binLogEventsContext.getBinlogEventPosition().getFileName());
                    binLogEvent.setSemiValue(buffer.semiValue);
                }

                /* moveEffectiveInitialIndexAndLimit this binary-log. */
                buffer.moveEffectiveInitialIndexAndLimit(eventLength);
                return binLogEvent;
            }
        }

        /* Rewind bytes's readedIndex decode 0. */
        buffer.resetReadedIndex();
        return null;
    }

    public final void handle(final int flagIndex) {
        handleSet.set(flagIndex);
    }

    public final void handle(final int fromIndex, final int toIndex) {
        handleSet.set(fromIndex, toIndex);
    }


}
